// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use derive_builder::Builder;
use serde::{Deserialize, Serialize};

use crate::storage::{ColumnDefaultConstraint, ColumnSchema, ConcreteDataType};

/// Id of column. Unique in each region.
pub type ColumnId = u32;
/// Group number of one region. Unique in each region.
pub type RegionGroup = u8;
/// Sequence number of region inside one table. Unique in each table.
/// The first 8 bits are preserved for [RegionGroup].
pub type RegionSeq = u32;
/// Id of regions under the same table. Unique in each table.
/// Is composed by [RegionGroup] and [RegionSeq].
pub type RegionNumber = u32;
/// Id of table. Universal unique.
pub type TableId = u32;

const REGION_GROUP_MASK: u32 = 0b1111_1111 << 24;
const REGION_SEQ_MASK: u32 = (0b1 << 24) - 1;

/// The max valid region sequence number.
pub const MAX_REGION_SEQ: u32 = REGION_SEQ_MASK;

/// Id of the region. It's generated by concatenating table id, region group and region number.
///
/// ```plaintext
/// 63                                  31         23                  0
/// ┌────────────────────────────────────┬──────────┬──────────────────┐
/// │          Table Id(32)              │ Group(8) │   Sequence(24)   │
/// └────────────────────────────────────┴──────────┴──────────────────┘
///                                         Region Number(32)
/// ```
#[derive(Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct RegionId(u64);

impl RegionId {
    /// Construct a new [RegionId] from table id and region number.
    pub const fn new(table_id: TableId, region_number: RegionNumber) -> RegionId {
        RegionId(((table_id as u64) << 32) | region_number as u64)
    }

    /// Returns the table id of the region.
    pub const fn table_id(&self) -> TableId {
        (self.0 >> 32) as TableId
    }

    /// Returns the region number of the region.
    pub const fn region_number(&self) -> RegionNumber {
        self.0 as RegionNumber
    }

    /// Returns the group number of the region
    pub const fn region_group(&self) -> RegionGroup {
        ((self.region_number() & REGION_GROUP_MASK) >> 24) as RegionGroup
    }

    /// Return the sequence number of the region
    pub const fn region_sequence(&self) -> RegionSeq {
        self.region_number() & REGION_SEQ_MASK
    }

    /// Returns the region id as u64.
    pub const fn as_u64(&self) -> u64 {
        self.0
    }

    /// Construct a new [RegionId] from u64.
    pub const fn from_u64(id: u64) -> RegionId {
        RegionId(id)
    }

    /// Construct a new [RegionId] from table id, region group and region sequence.
    pub const fn with_group_and_seq(
        table_id: TableId,
        group: RegionGroup,
        seq: RegionSeq,
    ) -> RegionId {
        RegionId(
            ((table_id as u64) << 32)
                | ((group as u32) << 24) as u64
                | (seq & REGION_SEQ_MASK) as u64,
        )
    }
}

impl fmt::Debug for RegionId {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "{}({}, {})",
            self.0,
            self.table_id(),
            self.region_number()
        )
    }
}

impl fmt::Display for RegionId {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl From<u64> for RegionId {
    fn from(region_id: u64) -> RegionId {
        RegionId::from_u64(region_id)
    }
}

impl From<RegionId> for u64 {
    fn from(region_id: RegionId) -> u64 {
        region_id.as_u64()
    }
}

impl PartialEq<u64> for RegionId {
    fn eq(&self, other: &u64) -> bool {
        self.0 == *other
    }
}

impl PartialEq<RegionId> for u64 {
    fn eq(&self, other: &RegionId) -> bool {
        *self == other.0
    }
}

/// A [ColumnDescriptor] contains information to create a column.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder)]
#[builder(pattern = "owned", build_fn(validate = "Self::validate"))]
pub struct ColumnDescriptor {
    pub id: ColumnId,
    #[builder(setter(into))]
    pub name: String,
    pub data_type: ConcreteDataType,
    /// Is column nullable, default is true.
    #[builder(default = "true")]
    is_nullable: bool,
    /// Is time index column, default is true.
    #[builder(default = "false")]
    is_time_index: bool,
    /// Default constraint of column, default is None, which means no default constraint
    /// for this column, and user must provide a value for a not-null column.
    #[builder(default)]
    default_constraint: Option<ColumnDefaultConstraint>,
    #[builder(default, setter(into))]
    pub comment: String,
}

impl ColumnDescriptor {
    #[inline]
    pub fn is_nullable(&self) -> bool {
        self.is_nullable
    }
    #[inline]
    pub fn is_time_index(&self) -> bool {
        self.is_time_index
    }

    #[inline]
    pub fn default_constraint(&self) -> Option<&ColumnDefaultConstraint> {
        self.default_constraint.as_ref()
    }

    /// Convert [ColumnDescriptor] to [ColumnSchema]. Fields not in ColumnSchema **will not**
    /// be stored as metadata.
    pub fn to_column_schema(&self) -> ColumnSchema {
        ColumnSchema::new(&self.name, self.data_type.clone(), self.is_nullable)
            .with_time_index(self.is_time_index)
            .with_default_constraint(self.default_constraint.clone())
            .expect("ColumnDescriptor should validate default constraint")
    }
}

impl ColumnDescriptorBuilder {
    pub fn new<S: Into<String>>(id: ColumnId, name: S, data_type: ConcreteDataType) -> Self {
        Self {
            id: Some(id),
            name: Some(name.into()),
            data_type: Some(data_type),
            ..Default::default()
        }
    }

    fn validate(&self) -> Result<(), String> {
        if let Some(name) = &self.name {
            if name.is_empty() {
                return Err("name should not be empty".to_string());
            }
        }

        if let (Some(Some(constraint)), Some(data_type)) =
            (&self.default_constraint, &self.data_type)
        {
            // The default value of unwrap_or should be same as the default value
            // defined in the `#[builder(default = "xxx")]` attribute.
            let is_nullable = self.is_nullable.unwrap_or(true);

            constraint
                .validate(data_type, is_nullable)
                .map_err(|e| e.to_string())?;
        }

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use datatypes::value::Value;

    use super::*;

    #[inline]
    fn new_column_desc_builder() -> ColumnDescriptorBuilder {
        ColumnDescriptorBuilder::new(3, "test", ConcreteDataType::int32_datatype())
    }

    #[test]
    fn test_column_descriptor_builder() {
        let desc = new_column_desc_builder().build().unwrap();
        assert_eq!(3, desc.id);
        assert_eq!("test", desc.name);
        assert_eq!(ConcreteDataType::int32_datatype(), desc.data_type);
        assert!(desc.is_nullable);
        assert!(desc.default_constraint.is_none());
        assert!(desc.comment.is_empty());

        let desc = new_column_desc_builder()
            .is_nullable(false)
            .build()
            .unwrap();
        assert!(!desc.is_nullable());

        let desc = new_column_desc_builder()
            .default_constraint(Some(ColumnDefaultConstraint::Value(Value::Null)))
            .build()
            .unwrap();
        assert_eq!(
            ColumnDefaultConstraint::Value(Value::Null),
            *desc.default_constraint().unwrap()
        );

        let desc = new_column_desc_builder()
            .default_constraint(Some(ColumnDefaultConstraint::Value(Value::Int32(123))))
            .build()
            .unwrap();
        assert_eq!(
            ColumnDefaultConstraint::Value(Value::Int32(123)),
            desc.default_constraint.unwrap()
        );

        let desc = new_column_desc_builder()
            .comment("A test column")
            .build()
            .unwrap();
        assert_eq!("A test column", desc.comment);

        assert!(new_column_desc_builder()
            .is_nullable(false)
            .default_constraint(Some(ColumnDefaultConstraint::Value(Value::Null)))
            .build()
            .is_err());
    }

    #[test]
    fn test_descriptor_to_column_schema() {
        let constraint = ColumnDefaultConstraint::Value(Value::Int32(123));
        let desc = new_column_desc_builder()
            .default_constraint(Some(constraint.clone()))
            .is_nullable(false)
            .build()
            .unwrap();
        let column_schema = desc.to_column_schema();
        let expected = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
            .with_default_constraint(Some(constraint))
            .unwrap();

        assert_eq!(expected, column_schema);
    }

    #[test]
    fn test_region_id() {
        assert_eq!(RegionId::new(0, 1), 1);
        assert_eq!(4294967296, RegionId::new(1, 0));
        assert_eq!(4294967297, RegionId::new(1, 1));
        assert_eq!(4294967396, RegionId::new(1, 100));
        assert_eq!(8589934602, RegionId::new(2, 10));
        assert_eq!(18446744069414584330, RegionId::new(u32::MAX, 10));

        let region_id = RegionId::new(u32::MAX, 1);
        assert_eq!(u32::MAX, region_id.table_id());
        assert_eq!(1, region_id.region_number());
        let inner: u64 = region_id.into();
        assert_eq!(RegionId::from(inner), region_id);

        let region_id = RegionId::new(1234, 5);
        assert_eq!("5299989643269(1234, 5)", region_id.to_string());
        assert_eq!("5299989643269(1234, 5)", format!("{:?}", region_id));
    }

    #[test]
    fn test_region_id_to_json() {
        let region_id = RegionId::from(4294967297);
        let json = serde_json::to_string(&region_id).unwrap();
        assert_eq!("4294967297", json);

        let parsed: RegionId = serde_json::from_str(&json).unwrap();
        assert_eq!(region_id, parsed);
    }

    #[test]
    fn test_retrieve_region_group_and_seq() {
        let region_id = RegionId::with_group_and_seq(111, 222, 333);
        assert_eq!(111, region_id.table_id());
        assert_eq!(222, region_id.region_group());
        assert_eq!(333, region_id.region_sequence());

        let expected_region_number = 222 << 24 | 333;
        assert_eq!(expected_region_number, region_id.region_number());
    }

    #[test]
    fn test_invalid_large_region_sequence() {
        // region sequence larger than `MAX_REGION_SEQ` will be masked into valid range
        let region_id = RegionId::with_group_and_seq(111, 222, u32::MAX);
        assert_eq!(MAX_REGION_SEQ, region_id.region_sequence());
    }
}
